-
Notifications
You must be signed in to change notification settings - Fork 421
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: merge using partition filters #1958
Conversation
ACTION NEEDED delta-rs follows the Conventional Commits specification for release automation. The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification. |
@Blajda I know you have done a bunch of things around merge, this may be of interest. |
@emcake this PR looks great. Good Job.
Yeah with current Datafusion this is not possible however there are talks about supporting sideways information passing which will essentially allow the build side of the hash join to pass information to the probe side which would enable this "dynamic" filtering.
I don't this is required. We are already using a hash join so the entire build side (source) is loaded into memory. For these types of merges we will need to switch to using a sort merge join. For your generalize filter method I'd like to see additional unit tests written so it easier to see how they get transformed. I will follow up with another optimization that saves rewriting unmodified files with changes from this branch |
Performance with the tcpds benchmark is on par with main. Before is main. After is this branch
|
@Blajda @wjones127 @rtyler @roeap I think this is now ready for review. |
@emcake Thanks for making the changes. Seems like the new unit tests are a bit flaky since the order of the disjunctions is not stable. One suggestion is to split the actual expression on |
I wondered if this would bite me! will do. |
@Blajda this isn't as pretty as I would like, I basically have to deconstruct the predicate into its component parts. this means that it's more fragile with how the predicate is constructed, but it will definitely verify the current behaviour. |
}) | ||
.reduce(Expr::or) | ||
.unwrap(); | ||
let split_pred = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to implement this yourself. Datafusion provides a utility for this: https://docs.rs/datafusion/latest/datafusion/logical_expr/utils/fn.split_binary_owned.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'll use that for the first layer unpacking the ORs. but I think I still need to do manual mangling on unwrapping the EQ expressions, because Expr doesn't implement Ord
so I need to extract the strings to sort them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah alright if that's the case then I think this is good!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
split_binary_owned
is new to v34: https://docs.rs/datafusion/33.0.0/datafusion/?search=split_binary_owned
if #1983 makes it first then I can update it. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can merge it now just want to give some time before I press the merge button. If you want to wait until after the other PR let me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@emcake This PR looks good to me. Thanks again for making this contribution.
Seems like the tests that failed also fail on main.
Description
This upgrades merge so that it can leverage partitions where specified in the join predicate. There are two ways we can leverage partitions:
target.partition = 1
.source.partition = target.partition
.In the first case, this implements the logic described in this comment. Any predicate mentioning the source that is not covered by (2) is pruned, which will leave predicates on just the target columns (and will be amenable to file pruning)
In the second case, we first construct a version of the predicate with references to source replaced with placeholders:
becomes:
We then stream through the source table, gathering the distinct tuples of the mentioned partitions:
and then expand out the sql to take these into account:
And insert this filter into the target chain. We also use the same filter to process the file list, meaning we only make remove actions for files that will be targeted by the scan.
I considered whether it would be possible to do this via datafusion sql in a generic manner, for example by first joining against the distinct partitions. I don't think it's possible - because each of the filters on the logical plans are static, there's no opportunity for it to push the distinct partition tuples down into the scan. Another variant would be to make it so the source and partition tables share the same
output_partitioning
structure, but as far as I can tell you wouldn't be able to make the partitions line up such that you can do the merge effectively and not read the whole table (plusDeltaScan
doesn't guarantee that one datafusion partition is one DeltaTable partition).I think the static bit is a no brainer but the eager read of the source table may cause issues if the source table is of a similar size to the target table. It may be prudent hide that part behind a feature flag on the merge, but would love comments on it.
Performance
I created a 16GB table locally with 1.25 billion rows over 1k partitions, and when updating 1 partition a full merge takes 1000-ish seconds:
but with partitioning it takes about 3:
In practice, the tables I'm wanting to use this for are terabytes in size so using merge is currently impractical. This would be a significant speed boost to them.
Related Issue(s)
closes #1846